In [2]:
import threading
import itertools
import time
import sys
class Signal:
go = True
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
time.sleep(.1)
if not signal.go:
break
write(' ' * len(status) + '\x08' * len(status))
def slow_function():
# pretend waiting a long time for I/O
time.sleep(3)
return 42
def supervisor():
signal = Signal()
spinner = threading.Thread(target = spin,
args = ('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result
def main():
result = supervisor()
print('Answer:', result)
if __name__ == '__main__':
main()
In [ ]:
import asyncio
import itertools
import sys
@asyncio.coroutine
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_function():
# pretend waiting for a long time for I/O
yield from asyncio.sleep(3)
return 42
@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin('thinking!'))
print('spinner object:', spinner)
result = yield from slow_function()
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer:', result)
if __name__ == '__main__':
main()
In [1]:
import asyncio
import aiohttp
from flags import BASE_URL, save_flag, show, main
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
image = yield from resp.read()
return image
@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)
if __name__ == '__main__':
main(download_many)
In [ ]:
import asyncio
import collections
import aiohttp
from aiohttp import web
import tqdm
from flags2_common import main, HTTPStatus, Result, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception): #1
def __init__(self, country_code):
self.country_code = country_code
@asyncio.coroutine
def get_flag(base_url, cc): #2
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code = resp.status, message = resp.reason,
headers = resp.headers)
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose): #3
try:
with (yield from semaphore): #4
image = yield from get_flag(base_url, cc) #5
except web.HTTPNotFound: #6
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc #7
else:
save_flag(image, cc.lower() + '.gif') #8
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(Status, cc)
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req): #1
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req) #2
to_do = [download_one(cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] #3
to_do_iter = asyncio.as_completed(to_do) #4
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) #5
for future in to_do_iter: #6
try:
res = yield from future #7
except FetchError as exc: #8
country_code = exc.country_code #9
try:
error_msg = exc.__cause__.args[0] #10
except IndexError:
error_msg = exc.__cause__.__class__.__name__ #11
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1 #12
return counter #13
def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
counts = loop.run_until_complete(coro) #14
loop.close() #15
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
In [2]:
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
try:
with (yield from semaphore):
image = yield from get_flag(base_url, cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
loop = asyncio.get_event_loop() #1
loop.run_in_executor(None, #2
save_flag, image, cc.lower() + '.gif') #3
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(Status, cc)
In [ ]:
@asyncio.coroutine
def http_get(url):
res = yield from aiohttp.request('GET', url)
if res.status == 200:
ctype = res.headers.get('Content-type', '').lower()
if 'json' in ctype or url.endswith('json'):
data = yield from res.json() #1
else:
data = yield from res.read() #2
return data
elif res.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.errors.HttpProcessingError(
code=res.status, message=res.reason,
headers=res.headers)
@asyncio.coroutine
def get_country(base_url, cc):
url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
metadata = yield from http_get(url) #3
return metadata['country']
@asyncio.coroutine
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
return (yield from http_get(url)) #4
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
try:
with (yield from semaphore): #5
image = yield from get_flag(base_url, cc)
with (yield from semaphore):
country = yield from get_country(base_url, cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
country = country.replace(' ', '_')
filename = '{}.{}.gif'.format(country, cc)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, filename)
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
In [ ]:
import sys
import asyncio
from charfinder import UnicodeNameIndex #1
CRLF = b'\r\n'
PROMPT = b'?>'
index = UnicodeNameIndex() #2
@asyncio.coroutine
def handle_queries(reader, writer): #3
while True: #4
writer.write(PROMPT) #5 can't yield from!
yield from writer.drain() #6 must yield from!
data = yield from reader.readline() #7
try:
query = data.decode().strip()
except UnicodeDecodeError: #8
query = '\x00'
client = writer.get_extra_info('peername') #9
print('Received from {}: {!r}'.format(client, query)) #10
if query:
if ord(query[:1]) < 32: #11
break
lines = list(index.find_description_strs(query)) #12
if lines:
writer.writelines(line.encode() + CRLF for line in lines) #13
writer.write(index.status(query, len(lines)).encode() + CRLF) #14
yield from writer.drain()
print('Sent {} results'.format(len(lines))) #16
print('Close the client socket') #17
writer.close()
def main(address='127.0.0.1', port=2323): #1
port = int(port)
loop = asyncio.get_event_loop()
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop) #2
server = loop.run_until_complete(server_coro) #3
host = server.sockets[0].getsockname() #4
print('Serving on {}. Hit CTRL-C to stop.'.format(host)) #5
try:
loop.run_forever() #6
except KeyboardInterrupt: # CTRL+C pressed
pass
print('Server shutting down.')
server.close() #7
loop.run_until_complete(server.wait_closed()) #8
loop.close() #9
if __name__ == '__main__':
main(*sys.argv[1:]) #10
In [ ]:
import asyncio
def home(request): #1
query = request.GET.get('query', '').strip() #2
print('Query: {!r}'.format(query)) #3
if query: #4
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Enter words describing characters.'
html = template.format(query=query, result=res, #5
message = msg)
print('Sending {} results'.format(len(descriptions))) #6
return web.Response(content_type=CONTENT_TYPE, text=html) #7
@asyncio.coroutine
def init(loop, address, port): #1
app = web.Application(loop=loop) #2
app.router.add_route('GET', '/', home) #3
handler = app.make_handler() #4
server = yield from loop.create_server(handler,
address, port) #5
return server.sockets[0].getsockname() #6
def main(address="127.0.0.1", port=8888):
port = int(port)
loop = asyncio.get_event_loop()
host = loop.run_until_complete(init(loop, address, port)) #7
print('Serving on {}, Hit CTRL-C to stop.'.format(host))
try:
loop.run_forever() #8
except KeyboardInterrupt:
pass
print('Server shutting down.')
loop.close() #9
if __name__ == '__main__':
main(*sys.argv[1:])